package drds.plus.sql_process.optimizer.chooser;

import drds.plus.common.jdbc.Parameters;
import drds.plus.common.model.Application;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.rule_engine.rule_calculate.ColumnNameToValueSetMap;
import drds.plus.rule_engine.rule_calculate.DataNodeDataScatterInfo;
import drds.plus.sql_process.abstract_syntax_tree.ObjectCreateFactory;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.configuration.TableMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.JoinStrategy;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.*;
import drds.plus.sql_process.abstract_syntax_tree.node.dml.*;
import drds.plus.sql_process.abstract_syntax_tree.node.query.*;
import drds.plus.sql_process.optimizer.OptimizerContext;
import drds.plus.sql_process.optimizer.OptimizerException;
import drds.plus.sql_process.optimizer.chooser.share_delegate.ShareDelegate;
import drds.plus.sql_process.optimizer.cost_esitimater.Cost;
import drds.plus.sql_process.optimizer.cost_esitimater.CostEsitimaterFactory;
import drds.plus.sql_process.optimizer.pre_processor.FilterPreProcessor;
import drds.plus.sql_process.optimizer.pre_processor.SubQueryPreProcessor;
import drds.plus.sql_process.utils.DnfFilters;
import drds.plus.sql_process.utils.OptimizerUtils;
import drds.plus.util.ExtraCmd;
import drds.tools.ShouldNeverHappenException;

import java.util.*;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * <pre>
 * 1. 根据Rule计算分库分表，并设置执行计划的executeOn()
 * 2. 如果存在多个执行目标库，构造为Merge查询树
 * </pre>
 */
public class Router {

    private static final String data_node_undecided = "data_node_undecided";
    private static Pattern suffixPattern = Pattern.compile("\\d+$"); // 提取字符串最后的数字

    public static drds.plus.sql_process.abstract_syntax_tree.node.Node route(drds.plus.sql_process.abstract_syntax_tree.node.Node node, Parameters parameters, Map<String, Object> extraCmd) {
        try {
            // 针对非batch，绑定变量后，优化语法树
            if (parameters == null || parameters != null && !parameters.isBatch()) {
                node.assignment(parameters);// null也要做一次，可能存在nextval
                // 绑定变量后，再做一次
                if (node instanceof Dml) {
                    ((Dml) node).setWhere((TableQuery) FilterPreProcessor.optimize(((Dml) node).getWhere(), false, extraCmd));
                } else if (node instanceof Query) {
                    node = FilterPreProcessor.optimize(((Query) node), false, extraCmd);
                }
            }
            if (node instanceof Dml) {
                if (node instanceof Insert) {
                    return shardInsert((Insert) node, parameters, extraCmd);
                }
                if (node instanceof Replace) {
                    return shardReplace((Replace) node, parameters, extraCmd);
                }
                if (node instanceof Update) {
                    return shardUpdate((Update) node, parameters, extraCmd);
                }
                if (node instanceof Delete) {
                    return shardDelete((Delete) node, parameters, extraCmd);
                }
            } else if (node instanceof Query) {
                return shardQuery((Query) node, parameters, extraCmd, true);
            }

        } catch (EmptyResultFilterException e) {
            e.setNode(node);
            throw e;
        }

        return node;
    }

    /**
     * 根据逻辑表名和执行条件计算出执行节点
     */
    private static List<DataNodeDataScatterInfo> route(String logicalName, Filter filter, boolean isWrite, Map<String, Object> extraCmd) {
        boolean forceAllowFullTableScan = ExtraCmd.getExtraCmdBoolean(extraCmd, ConnectionProperties.ALLOW_FULL_TABLE_SCAN, false);
        String tableName = logicalName.split("\\.")[0];
        OptimizerContext.getOptimizerContext().getSchemaManager().getTableMetaData(tableName); // 验证下表是否存在
        List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = OptimizerContext.getOptimizerContext().getRouteOptimizer().route(logicalName, filter, isWrite, forceAllowFullTableScan);
        if (dataNodeDataScatterInfoList == null || dataNodeDataScatterInfoList.isEmpty()) {
            throw new EmptyResultFilterException();
        }
        return dataNodeDataScatterInfoList;
    }

    private static boolean isBroadcast(String logicalName) {
        String tableName = logicalName.split("\\.")[0];
        return OptimizerContext.getOptimizerContext().getRouteOptimizer().isBroadCast(tableName);
    }

    private static boolean chooseBroadcastWrite(Map<String, Object> extraCmd) {
        return ExtraCmd.getExtraCmdBoolean(extraCmd, ConnectionProperties.CHOOSE_BROADCAST_WRITE, true);
    }


    /**
     * <pre>
     *   ========================================================================================================================
     *   更新操作
     *   ========================================================================================================================
     * </pre>
     */

    /**
     * 这个是针对DML节点广播表特殊处理
     */
    private static MergeQuery buildDmlMergeBroadcast(Dml dml) {
        Application application = OptimizerContext.getOptimizerContext().getApplication();
        List<String> dataNodeIdList = new ArrayList<String>();
        for (drds.plus.common.model.DataNode dataNode : application.getDataNodeList()) {
            dataNodeIdList.add(dataNode.getId());
        }
        String dataNodeId = dml.getDataNodeId();
        dataNodeIdList.remove(dataNodeId);
        //
        MergeQuery mergeQuery = new MergeQuery();
        mergeQuery.setDmlByBroadcast(true);
        mergeQuery.merge(dml);//本身
        for (String $dataNodeId : dataNodeIdList) {
            Dml dml1 = (Dml) dml.deepCopy();//其他的都是deepCopy,dataNodeIdList.remove(dataNodeId);
            setDataNodeId(dml1, $dataNodeId);
            dml1.setDataNodeId($dataNodeId);
            mergeQuery.merge(dml1);
        }
        mergeQuery.setDataNodeId(dataNodeId);
        return mergeQuery;
    }

    private static drds.plus.sql_process.abstract_syntax_tree.node.Node shardInsert(Insert insert1, Parameters parameters, Map<String, Object> extraCmd) {
        String indexName = null;
        if (insert1.getWhere() instanceof TableQueryWithIndex) {
            indexName = ((TableQueryWithIndex) insert1.getWhere()).getIndexName();
        } else {
            indexName = insert1.getWhere().getTableMetaData().getPrimaryKeyIndexMetaData().getIndexName();
        }
        boolean broadcastAndChooseBroadcastWrite = isBroadcast(indexName) && chooseBroadcastWrite(extraCmd);
        //
        // 处理batch
        if (parameters != null && parameters.isBatch()) {
            if (insert1.getQuery() != null) {
                throw new OptimizerException("insert selectStatement not support in batch");
            }
            Map<List<String>, List<Integer>> dataNodeIdAndTableNameListToBatchIndexListMap = new HashMap<List<String>, List<Integer>>();
            for (int index = 0; index < parameters.getBatchSize(); index++) {
                // 做一下绑定变量
                insert1.assignment(parameters.cloneByBatchIndex(index));
                // 根据规则计算
                Filter insertFilter = createEqualFilter(insert1.getColumnNameList(), insert1.getColumnValueList());
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, insertFilter, true, extraCmd);
                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("insert not support muti realTableNamesStringSet, parameter is " + parameters);
                } else {
                    // 构造key
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    List<Integer> batchIndexList = dataNodeIdAndTableNameListToBatchIndexListMap.get(dataNodeIdAndTableNameList);
                    if (batchIndexList == null) {
                        batchIndexList = new ArrayList<Integer>();
                        dataNodeIdAndTableNameListToBatchIndexListMap.put(dataNodeIdAndTableNameList, batchIndexList);
                    }
                    batchIndexList.add(index);
                }
            }
            List<Insert> insertList = new ArrayList<Insert>();
            for (Map.Entry<List<String>, List<Integer>> entry : dataNodeIdAndTableNameListToBatchIndexListMap.entrySet()) {
                Insert $insertNode = buildInsert(insert1, indexName, entry.getKey().get(0), entry.getKey().get(1));
                $insertNode.setBatchIndexList(entry.getValue());
                insertList.add($insertNode);
            }
            if (insertList.size() > 1) {
                // 构造merge
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : insertList) {
                    mergeQuery.merge(node);
                }
                mergeQuery.setDataNodeId(insertList.get(0).getDataNodeId());
                mergeQuery.setExtra(insertList.get(0).getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else if (broadcastAndChooseBroadcastWrite) {
                // 处理广播表
                return buildDmlMergeBroadcast(insertList.get(0));
            } else {
                return insertList.get(0);
            }
        } else if (insert1.isValueListList()) {// 处理下insert多value
            if (insert1.getQuery() != null) {
                throw new OptimizerException("insert selectStatement not support in multi columnValueList chars");
            }
            int valueListListSize = insert1.getValueListListSize();
            Map<List<String>, List<List<Object>>> dataNodeIdAndTableNameListToValueListListMap = new HashMap<List<String>, List<List<Object>>>();
            for (int i = 0; i < valueListListSize; i++) {
                // 根据规则计算
                List<Object> valueList = insert1.getValueList(i);
                Filter filter = createEqualFilter(insert1.getColumnNameList(), insert1.getValueList(i));
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, filter, true, extraCmd);
                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("insert must contain all routes columnNameList");
                } else {
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    List<List<Object>> valueListList = dataNodeIdAndTableNameListToValueListListMap.get(dataNodeIdAndTableNameList);
                    if (valueListList == null) {
                        valueListList = new ArrayList<List<Object>>();
                        dataNodeIdAndTableNameListToValueListListMap.put(dataNodeIdAndTableNameList, valueListList);
                    }
                    valueListList.add(valueList);
                }
            }
            if (dataNodeIdAndTableNameListToValueListListMap.size() > 1) {
                MergeQuery mergeQuery = new MergeQuery();
                for (Map.Entry<List<String>, List<List<Object>>> entry : dataNodeIdAndTableNameListToValueListListMap.entrySet()) {
                    List<String> dataNodeIdAndTableNameList = entry.getKey();
                    Insert $insertNode = buildInsert(insert1, indexName, dataNodeIdAndTableNameList.get(0), dataNodeIdAndTableNameList.get(1));//不同的节点
                    $insertNode.setValueListList(entry.getValue());
                    mergeQuery.merge($insertNode);
                }
                mergeQuery.setDataNodeId(mergeQuery.getFirstSubNodeQueryNode().getDataNodeId());
                mergeQuery.setExtra(mergeQuery.getFirstSubNodeQueryNode().getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else {
                List<String> dataNodeIdAndTableNameList = dataNodeIdAndTableNameListToValueListListMap.keySet().iterator().next();
                Insert $insert = buildInsert(insert1, indexName, dataNodeIdAndTableNameList.get(0), dataNodeIdAndTableNameList.get(1));
                if (broadcastAndChooseBroadcastWrite) {
                    // 处理广播表
                    return buildDmlMergeBroadcast($insert);
                } else {
                    return $insert;
                }
            }
        } else {
            if (insert1.getQuery() == null) {
                // 根据规则计算
                Filter filter = createEqualFilter(insert1.getColumnNameList(), insert1.getColumnValueList());
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, filter, true, extraCmd);


                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("insert not support muti realTableNamesStringSet");
                } else {
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    insert1.setDataNodeId(dataNodeDataScatterInfo.getDataNodeId());
                    Insert $insertNode = buildInsert(insert1, indexName, dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    if (broadcastAndChooseBroadcastWrite) {
                        // 处理广播表
                        return buildDmlMergeBroadcast($insertNode);
                    } else {
                        return $insertNode;
                    }
                }
            } else {
                Query insert = shardQuery(insert1.getWhere(), parameters, extraCmd, true);
                Query select = shardQuery(insert1.getQuery(), parameters, extraCmd, true);
                // [1]insert query mode, 需要判断分库分表是否完全一致
                JoinInfo joinInfo = isInsertOrReplaceWithSelectJoinNodeShardRuleInfo(insert1.getWhere(), insert1.getQuery());
                if (joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable || aNodeIsSingleDataBaseSingleTableAndANodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(insert, select)) {//[2]
                    // 如果join group相同，则继续判断
                    drds.plus.sql_process.abstract_syntax_tree.node.Node merge = buildMergeInsertOrReplaceWithSelect(insert1, insert, select, extraCmd);
                    if (merge != null) {
                        /**
                         * broadcast:insertNode.getWhere().broadcast
                         */
                        if (broadcastAndChooseBroadcastWrite && select.isBroadcast()) { // 都是广播表-进行广播表优化
                            // 如果是广播表，一定是单表
                            insert1.setWhere((TableQuery) insert);
                            insert1.setQuery(select);
                            insert1.setDataNodeId(insert.getDataNodeId());
                            return buildDmlMergeBroadcast(insert1);
                        } else {
                            return merge;
                        }
                    }
                }
                //[3] 如果满足单库单表的逻辑(不能包含merge节点，可以包含query节点)，并且大家都在一个库上
                String insertDataNodeId = checkAndGetOnlyOneDataNodeId(insert);
                String selectDataNodeId = checkAndGetOnlyOneDataNodeId(select);
                if (insertDataNodeId != null && selectDataNodeId != null && insertDataNodeId.equals(selectDataNodeId)) {
                    insert1.setWhere((TableQuery) insert);
                    insert1.setQuery(select);
                    insert1.setDataNodeId(insert.getDataNodeId());
                    return insert1;
                }
                throw new OptimizerException("insert selectStatement not support cross db");

            }
        }
    }

    private static Insert buildInsert(Insert insert, String indexName, String dataNodeId, String tableName) {
        TableQueryWithIndex tableQueryWithIndex = new TableQueryWithIndex(indexName);
        // 设置为物理的表
        tableQueryWithIndex.setDataNodeId(dataNodeId);
        tableQueryWithIndex.setActualTableName(tableName);
        //
        Insert copySelf = insert.copySelf();
        copySelf.setWhere(tableQueryWithIndex);
        copySelf.setDataNodeId(tableQueryWithIndex.getDataNodeId());
        copySelf.build();
        return copySelf;
    }

    private static drds.plus.sql_process.abstract_syntax_tree.node.Node shardReplace(Replace replace, Parameters parameters, Map<String, Object> extraCmd) {
        String indexName = null;
        if (replace.getWhere() instanceof TableQueryWithIndex) {
            indexName = ((TableQueryWithIndex) replace.getWhere()).getIndexName();
        } else {
            indexName = replace.getWhere().getTableMetaData().getPrimaryKeyIndexMetaData().getIndexName();
        }
        boolean broadcast = isBroadcast(indexName) && chooseBroadcastWrite(extraCmd);
        // 处理batch
        if (parameters != null && parameters.isBatch()) {
            if (replace.getQuery() != null) {
                throw new OptimizerException("replace insert selectStatement not support in batch");
            }
            Map<List<String>, List<Integer>> dataNodeIdAndTableNameListToBatchIndexListMap = new HashMap<List<String>, List<Integer>>();
            for (int index = 0; index < parameters.getBatchSize(); index++) {
                // 做一下绑定变量
                replace.assignment(parameters.cloneByBatchIndex(index));
                // 根据规则计算
                Filter filter = createEqualFilter(replace.getColumnNameList(), replace.getColumnValueList());
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, filter, true, extraCmd);

                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("replace must contain all routes columnNameList");
                } else {
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    List<Integer> batchIndexList = dataNodeIdAndTableNameListToBatchIndexListMap.get(dataNodeIdAndTableNameList);
                    if (batchIndexList == null) {
                        batchIndexList = new ArrayList<Integer>();
                        dataNodeIdAndTableNameListToBatchIndexListMap.put(dataNodeIdAndTableNameList, batchIndexList);
                    }
                    batchIndexList.add(index);
                }
            }
            List<Replace> replaceNodeList = new ArrayList<Replace>();
            for (Map.Entry<List<String>, List<Integer>> entry : dataNodeIdAndTableNameListToBatchIndexListMap.entrySet()) {
                Replace $replace = buildReplace(replace, indexName, entry.getKey().get(0), entry.getKey().get(1));
                $replace.setBatchIndexList(entry.getValue());
                replaceNodeList.add($replace);
            }
            if (replaceNodeList.size() > 1) {
                // 构造merge
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : replaceNodeList) {
                    mergeQuery.merge(node);
                }
                mergeQuery.setDataNodeId(replaceNodeList.get(0).getDataNodeId());
                mergeQuery.setExtra(replaceNodeList.get(0).getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else if (broadcast) {
                // 处理广播表
                return buildDmlMergeBroadcast(replaceNodeList.get(0));
            } else {
                return replaceNodeList.get(0);
            }
        } else if (replace.isValueListList()) { // 处理下insert多value
            if (replace.getQuery() != null) {
                throw new OptimizerException("replace insert selectStatement not support in batch");
            }
            int valueListListSize = replace.getValueListListSize();
            Map<List<String>, List<List<Object>>> dataNodeIdAndTableNameListToValueListListMap = new HashMap<List<String>, List<List<Object>>>();
            for (int i = 0; i < valueListListSize; i++) {
                // 根据规则计算
                List<Object> valueList = replace.getValueList(i);
                Filter filter = createEqualFilter(replace.getColumnNameList(), replace.getValueList(i));
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, filter, true, extraCmd);
                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("add not support muti realTableNamesStringSet");
                } else {
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    List<List<Object>> valueListList = dataNodeIdAndTableNameListToValueListListMap.get(dataNodeIdAndTableNameList);
                    if (valueListList == null) {
                        valueListList = new ArrayList<List<Object>>();
                        dataNodeIdAndTableNameListToValueListListMap.put(dataNodeIdAndTableNameList, valueListList);
                    }
                    valueListList.add(valueList);
                }
            }
            if (dataNodeIdAndTableNameListToValueListListMap.size() > 1) {
                // 构造merge
                MergeQuery mergeQuery = new MergeQuery();
                for (Map.Entry<List<String>, List<List<Object>>> entry : dataNodeIdAndTableNameListToValueListListMap.entrySet()) {
                    List<String> dataNodeIdAndTableNameList = entry.getKey();
                    Replace $replaceNode = buildReplace(replace, indexName, dataNodeIdAndTableNameList.get(0), dataNodeIdAndTableNameList.get(1));
                    $replaceNode.setValueListList(entry.getValue()); // 修改一下对应的multiValues
                    mergeQuery.merge($replaceNode);
                }
                mergeQuery.setDataNodeId(mergeQuery.getFirstSubNodeQueryNode().getDataNodeId());
                mergeQuery.setExtra(mergeQuery.getFirstSubNodeQueryNode().getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else {
                List<String> dataNodeIdAndTableNameList = dataNodeIdAndTableNameListToValueListListMap.keySet().iterator().next();
                Replace $replaceNode = buildReplace(replace, indexName, dataNodeIdAndTableNameList.get(0), dataNodeIdAndTableNameList.get(1));
                if (broadcast) {
                    // 处理广播表
                    return buildDmlMergeBroadcast($replaceNode);
                } else {
                    return $replaceNode;
                }
            }
        } else {
            if (replace.getQuery() == null) {
                // 根据规则计算
                Filter filter = createEqualFilter(replace.getColumnNameList(), replace.getColumnValueList());
                List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(indexName, filter, true, extraCmd);
                if (!(dataNodeDataScatterInfoList.size() == 1 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap() != null && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() == 1)) {
                    throw new OptimizerException("add not support muti realTableNamesStringSet");
                } else {
                    DataNodeDataScatterInfo dataNodeDataScatterInfo = dataNodeDataScatterInfoList.get(0);
                    replace.setDataNodeId(dataNodeDataScatterInfo.getDataNodeId());
                    Replace $replaceNode = buildReplace(replace, indexName, dataNodeDataScatterInfo.getDataNodeId(), dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().keySet().iterator().next());
                    if (broadcast) {
                        // 处理广播表
                        return buildDmlMergeBroadcast($replaceNode);
                    } else {
                        return $replaceNode;
                    }
                }
            } else {
                Query insert = shardQuery(replace.getWhere(), parameters, extraCmd, true);
                Query select = shardQuery(replace.getQuery(), parameters, extraCmd, true);
                //[2] insert selectStatement mode, 需要判断分库分表是否完全一致
                JoinInfo joinInfo = isInsertOrReplaceWithSelectJoinNodeShardRuleInfo(replace.getWhere(), replace.getQuery());
                if (joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable || aNodeIsSingleDataBaseSingleTableAndANodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(insert, select)) {//[2]
                    // 如果join group相同，则继续判断
                    drds.plus.sql_process.abstract_syntax_tree.node.Node merge = buildMergeInsertOrReplaceWithSelect(replace, insert, select, extraCmd);
                    /**
                     * broadcast:replaceNode.getWhere()
                     */
                    if (broadcast && select.isBroadcast()) { // 都是广播表--进行广播表优化
                        // 如果是广播表，一定是单表
                        replace.setWhere((TableQuery) insert);
                        replace.setQuery(select);
                        replace.setDataNodeId(insert.getDataNodeId());
                        return buildDmlMergeBroadcast(replace);
                    } else {
                        return merge;
                    }
                }
                //[3] 如果满足单库单表的逻辑(不能包含merge节点，可以包含query节点)，并且大家都在一个库上
                String insertDataNodeId = checkAndGetOnlyOneDataNodeId(insert);
                String selectDataNodeId = checkAndGetOnlyOneDataNodeId(select);
                if (insertDataNodeId != null && selectDataNodeId != null && insertDataNodeId.equals(selectDataNodeId)) {
                    replace.setWhere((TableQuery) insert);
                    replace.setQuery(select);
                    replace.setDataNodeId(insert.getDataNodeId());
                    return replace;
                }
                throw new OptimizerException("insert selectStatement not support cross db");
            }
        }
    }

    private static Replace buildReplace(Replace replace, String indexName, String dataNodeId, String tableName) {
        TableQueryWithIndex tableQueryWithIndex = new TableQueryWithIndex(indexName);
        // 设置为物理的表
        tableQueryWithIndex.setDataNodeId(dataNodeId);
        tableQueryWithIndex.setActualTableName(tableName);

        Replace copySelf = replace.copySelf();
        copySelf.setWhere(tableQueryWithIndex);
        copySelf.setDataNodeId(tableQueryWithIndex.getDataNodeId());
        copySelf.build();
        return copySelf;
    }

    private static Filter createEqualFilter(List<Item> itemList, List<Object> valueList) {
        Filter insertFilter = null;
        if (itemList.size() == 1) {
            BooleanFilter booleanFilter = ObjectCreateFactory.createBooleanFilter();
            booleanFilter.setOperation(Operation.equal);
            booleanFilter.setColumn(itemList.get(0));
            booleanFilter.setValue(valueList.get(0));
            insertFilter = booleanFilter;
        } else {
            LogicalOperationFilter and = ObjectCreateFactory.createLogicalOperationFilter();
            and.setOperation(Operation.and);
            for (int i = 0; i < itemList.size(); i++) {
                Comparable comparable = itemList.get(i);
                BooleanFilter booleanFilter = ObjectCreateFactory.createBooleanFilter();
                booleanFilter.setOperation(Operation.equal);
                booleanFilter.setColumn(comparable);
                booleanFilter.setValue(valueList.get(i));
                and.addFilter(booleanFilter);
            }

            insertFilter = and;
        }
        return insertFilter;
    }

    /**
     * 将insert...select的节点构造为merge.[两个表节点 可能是广播表或者MergeQueryNode其中一种情况]
     *
     * <pre>
     * 注意点： 1. 针对join group相同而分区不同的，则相信配置是正确的，不会再做left/right分区是否一致的check 2. insert
     * ... query xx where id =
     * 3，构建出来的insert是一个全表扫描，而select会是id=3的分区，分区条件也不会是一致
     */
    private static drds.plus.sql_process.abstract_syntax_tree.node.Node buildMergeInsertOrReplaceWithSelect(Dml dml, Query leftQuery, Query rightQuery, Map<String, Object> extraCmd) {
        // 根据表名的后缀来判断两个表是不是对应的表
        // 底下的节点已经被优先处理
        // 1. 如果是KvIndexNode，没必要转化为, 直接返回 即可
        // 2. 如果是QueryNode，底下已经将其转化为merge下套query+child
        // 所以，只要处理MergeNode即可

        // 因为insert...query，分库条件只在select中存在，所以需要以select为驱动表
        // 如果insert中分区表在本次select中不存在，则忽略当前insert的分区表
        // 也就是insert ... query xx where id = 3,
        // 只会构建出一个id=3分区下的insert select节点.


        //
        List<Query> leftQueryList = new ArrayList<Query>();
        List<Query> rightQueryList = new ArrayList<Query>();
        Map<String, Query> rightQueryToExtraMap = new HashMap<String, Query>();
        //
        boolean leftQueryIsBroadcast = false;
        boolean rightQueryIsBroadcast = false;
        if (leftQuery instanceof MergeQuery) {
            for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : leftQuery.getNodeList()) {
                leftQueryList.add((Query) node);
            }
        } else {
            // 可能是广播表
            leftQueryIsBroadcast = leftQuery.isBroadcast();
        }
        if (rightQuery instanceof MergeQuery) {
            for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : rightQuery.getNodeList()) {
                rightQueryToExtraMap.put((String) node.getExtra(), (Query) node);
                rightQueryList.add((Query) node);
            }
        } else {
            // 可能是广播表
            rightQueryIsBroadcast = rightQuery.isBroadcast();
        }
        // 非广播表，并且存在聚合计算，不能展开
        if (!leftQueryIsBroadcast && (leftQuery.isExistAggregateExpression() || leftQuery.getLimitFrom() != null || leftQuery.getLimitTo() != null)) {
            return null;
        }
        // 非广播表，并且存在聚合计算，不能展开
        if (!rightQueryIsBroadcast && (rightQuery.isExistAggregateExpression() || rightQuery.getLimitFrom() != null || rightQuery.getLimitTo() != null)) {
            return null;
        }
        //
        List<Dml> dmlList = new ArrayList<Dml>();
        if (leftQueryIsBroadcast && rightQueryIsBroadcast) {
            return dml;// 两个广播表之间的insert selectStatement，直接返回
        } else if (leftQueryIsBroadcast) {// 左边是广播表，会在每个分表上执行，然后广播表数据互相merge
            for (Query $rightQuery : rightQueryList) {
                //
                Query $leftQuery = leftQuery.copy();
                setDataNodeId($leftQuery, $rightQuery.getDataNodeId());// 广播表的执行节点跟着右边走
                //
                Dml $dml = dml.copySelf();
                $dml.setWhere((TableQuery) $leftQuery);
                $dml.setQuery($rightQuery);
                $dml.setDataNodeId($rightQuery.getDataNodeId());
                $dml.setExtra($rightQuery.getExtra());
                dmlList.add($dml);
            }
        } else if (rightQueryIsBroadcast) {
            for (Query $leftQuery : leftQueryList) {
                Query $rightQuery = rightQuery.copy();
                setDataNodeId($rightQuery, $leftQuery.getDataNodeId());// 广播表的执行节点跟着左边走
                //
                Dml $dml = dml.copySelf();
                $dml.setWhere((TableQuery) $leftQuery);
                $dml.setQuery($rightQuery);
                $dml.setDataNodeId($leftQuery.getDataNodeId());
                $dml.setExtra($leftQuery.getExtra());
                dmlList.add($dml);
            }
        } else {
            // 根据后缀，找到匹配的表，生成insert query
            for (Query $leftQuery : leftQueryList) {//左边为主导 右边如果不存在也不影响
                Query $rightQuery = rightQueryToExtraMap.remove($leftQuery.getExtra());//判断是否存在
                if ($rightQuery == null) {
                    continue; // 如果select中没有，则忽略当前insert，注意不是返回null
                }
                Dml $dml = dml.copySelf();
                $dml.setWhere((TableQuery) $leftQuery);
                $dml.setQuery($rightQuery);
                $dml.setDataNodeId($leftQuery.getDataNodeId());
                $dml.setExtra($leftQuery.getExtra()); // 因为left/right的extra相同，只要选择一个即可
                dmlList.add($dml);
            }
        }

        if (dmlList.size() > 1) {
            MergeQuery mergeQuery = new MergeQuery();
            for (Dml $dml : dmlList) {
                mergeQuery.merge($dml);
            }
            mergeQuery.setDataNodeId(dmlList.get(0).getDataNodeId());
            mergeQuery.setExtra(dmlList.get(0).getExtra());// last on first abstractSyntaxTreeNode
            mergeQuery.build();
            return mergeQuery;
        } else if (dmlList.size() == 1) {
            return dmlList.get(0);
        } else {
            return null;
        }
    }

    /**
     * 判断insert..select的分区规则是否一致
     */
    private static JoinInfo isInsertOrReplaceWithSelectJoinNodeShardRuleInfo(Query insertOrReplace, Query select) {
        JoinInfo leftNodeJoinInfo = isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList(insertOrReplace, new ArrayList<Item>());
        if (!leftNodeJoinInfo.theSameJoinDataNodeIdOrHasBroadcastLable) {
            return leftNodeJoinInfo;
        }
        JoinInfo rightNodeJoinInfo = isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList(select, new ArrayList<Item>());
        if (!rightNodeJoinInfo.theSameJoinDataNodeIdOrHasBroadcastLable) {
            return rightNodeJoinInfo;
        }
        JoinInfo joinInfo = new JoinInfo();
        joinInfo.hasBroadcastLable = leftNodeJoinInfo.hasBroadcastLable || rightNodeJoinInfo.hasBroadcastLable;
        joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = leftNodeJoinInfo.joinDataNodeId.equals(rightNodeJoinInfo.joinDataNodeId) || joinInfo.hasBroadcastLable;
        joinInfo.joinDataNodeId = leftNodeJoinInfo.joinDataNodeId;
        joinInfo.joinFilterList = null;
        return joinInfo;
    }

    /**
     * 找到join条件完全是分区键的filter，返回null代表没找到，否则返回join条件
     */
    private static JoinInfo isJoinItemListIsFullMatchShardColumnNameList(Join joinNode) {
        JoinInfo leftNodeJoinInfo = isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList(joinNode.getLeftNode(), joinNode.getLeftJoinItemList());
        JoinInfo rightNodeJoinInfo = isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList(joinNode.getRightNode(), joinNode.getRightJoinItemList());
        // 允许一张单表和一张广播表的组合
        if (!leftNodeJoinInfo.theSameJoinDataNodeIdOrHasBroadcastLable && !rightNodeJoinInfo.hasBroadcastLable) {
            return leftNodeJoinInfo;
        }
        if (!rightNodeJoinInfo.theSameJoinDataNodeIdOrHasBroadcastLable && !leftNodeJoinInfo.hasBroadcastLable) {
            return rightNodeJoinInfo;
        }
        JoinInfo joinInfo = new JoinInfo();
        joinInfo.hasBroadcastLable = leftNodeJoinInfo.hasBroadcastLable || rightNodeJoinInfo.hasBroadcastLable;
        joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = leftNodeJoinInfo.joinDataNodeId.equals(rightNodeJoinInfo.joinDataNodeId) || joinInfo.hasBroadcastLable;
        joinInfo.joinDataNodeId = leftNodeJoinInfo.joinDataNodeId;
        joinInfo.joinFilterList = joinNode.getJoinFilterList();
        return joinInfo;
    }

    /**
     * 判断一个joinNode的左或则右节点的joinColumns是否和当前节点的分区键完全匹配
     */
    private static JoinInfo isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList(Query query, List<Item> joinItemList) {
        JoinInfo joinInfo = new JoinInfo();
        if (query instanceof $Query$) {
            List<Item> newJoinItemList = new LinkedList<Item>();
            for (Item joinColumn : joinItemList) {
                Item newJoinItem = query.getItem(joinColumn);
                if (newJoinItem == null) {
                    return null;
                } else {
                    newJoinItemList.add(newJoinItem);
                }
            }
            // 获取当前表的分区字段
            return isLeftOrRightNodeJoinItemListIsFullMatchShardColumnNameList((Query) query.getFirstSubNodeQueryNode(), newJoinItemList);
        } else if (query instanceof MergeQuery) {
            joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;
            return joinInfo; // 直接返回，不处理
        } else if (query instanceof Join) {
            joinInfo = isJoinItemListIsFullMatchShardColumnNameList((Join) query);
            if (!joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable) {// 递归失败，直接返回
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;
                return joinInfo;
            }
            if (joinItemList.isEmpty()) { // 针对insert..query
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = true;
                return joinInfo;
            }
            List<BooleanFilter> joinFilterList = joinInfo.joinFilterList;
            if (joinFilterList == null) {// 底下不满足，直接退出（join 但是没有joinFilterList 则不能进行切分列判断）
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;
                return joinInfo;
            }
            for (Item joinColumn : joinItemList) {
                Item item = query.getItem(joinColumn);
                if (!isJoinFilterItem(joinFilterList, item)) {
                    joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;
                    return joinInfo;
                }
            }
            joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = true;
            return joinInfo;
        } else if (query instanceof TableQueryWithIndex) {
            String indexName = ((TableQueryWithIndex) query).getIndexName();
            joinInfo.joinDataNodeId = OptimizerContext.getOptimizerContext().getRouteOptimizer().getJoinDataNodeId(indexName);
            if (OptimizerContext.getOptimizerContext().getRouteOptimizer().isBroadCast(((TableQueryWithIndex) query).getIndexName())) {
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = true;
                joinInfo.hasBroadcastLable = true;
                return joinInfo;
            }
            if (joinItemList.isEmpty()) { // 如果没有join列，可能是insert...select情况
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = true;
                return joinInfo;
            }
            TableMetaData tableMetaData = ((TableQueryWithIndex) query).getTableMetaData();
            List<String> shardColumnNameList = OptimizerContext.getOptimizerContext().getRouteOptimizer().getShardColumnNameList(((TableQueryWithIndex) query).getIndexName());
            List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
            for (String shardColumn : shardColumnNameList) {//需要保证元数据存在
                ColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(shardColumn);
                if (columnMetaData == null) {
                    // 可能是一个不存在的分库字段
                    joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;
                    return joinInfo;
                }
                columnMetaDataList.add(columnMetaData);
            }
            String tableName = ((TableQueryWithIndex) query).getTableName();
            if (query.getAlias() != null) {
                tableName = query.getAlias();
            }
            List<Item> shardItemList = OptimizerUtils.columnMetaListToIColumnList(columnMetaDataList, tableName);
            if (shardItemList.isEmpty()) {
                joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;// 没有分库键
                return joinInfo;
            }
            // 要求joinItemList必须包含所有的shardItemList,即joinItemList.size()>shardItemList.size()
            for (Item shardItem : shardItemList) {
                boolean isFound = false;
                for (Item joinColumn : joinItemList) {
                    if (joinColumn.getColumnName().equals(shardItem.getColumnName())) {// partition无别名
                        isFound = true;
                        break;
                    }
                }
                if (!isFound) {
                    joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = false;// 没有分库键
                    return joinInfo;
                }
            }
            joinInfo.theSameJoinDataNodeIdOrHasBroadcastLable = true;
            return joinInfo;
        } else {
            throw new ShouldNeverHappenException();
        }
    }

    /**
     * 判断是否是join条件中的一个字段，可能是左表或右边的字段
     */
    private static boolean isJoinFilterItem(List<BooleanFilter> joinFilterList, Item item) {
        for (BooleanFilter joinFilter : joinFilterList) {
            Item leftJoinItem = (Item) joinFilter.getColumn();
            Item rightJoinItem = (Item) joinFilter.getValue();
            if (leftJoinItem.equals(item) || rightJoinItem.equals(item)) {
                return true;
            }
        }
        return false;
    }
    //

    private static drds.plus.sql_process.abstract_syntax_tree.node.Node shardUpdate(Update update, Parameters parameters, Map<String, Object> extraCmd) {

        // 处理batch
        // case1 :
        // update xxx setWhereAndSetNeedBuild id > ? and id < ?
        // 针对绑定变量：
        // a. id > 1 and id < 4 , 会得到 2, 3
        // b. id > 2 and id < 5 , 会得到 3, 4
        // 会得到3个表的batch. 2(a) 3(a,b) 4(b)
        //
        // case 2 :
        // update xxx setWhereAndSetNeedBuild id in (?,?)
        // 针对绑定变量：
        // a. id in (2, 3) , 会得到 2, 3
        // b. id in (3, 4) , 会得到 3, 4
        // 会得到3个表的batch. 2(a) 3(a,b) 4(b)
        // 注意:发送给表的sql均为 id in (?,?) ，这里不会是经过in优化的sql
        boolean isBroadcastAndChooseBroadcastWrite = isBroadcast(update.getWhere().getTableName()) && chooseBroadcastWrite(extraCmd);
        if (parameters != null && parameters.isBatch()) {
            Map<List<String>, List<Integer>> dataNodeIdAndTableNameListToBatchIndexListMap = new HashMap<List<String>, List<Integer>>();
            Query whereNode = null;
            for (int i = 0; i < parameters.getBatchSize(); i++) {
                // 做一下绑定变量
                update.assignment(parameters.cloneByBatchIndex(i));
                // 根据规则计算,不处理in优化
                Query query = shardQuery(update.getWhere(), parameters, extraCmd, false);
                List<drds.plus.sql_process.abstract_syntax_tree.node.Node> nodeList = new ArrayList<drds.plus.sql_process.abstract_syntax_tree.node.Node>();
                if (query instanceof MergeQuery) {
                    nodeList.addAll(query.getNodeList());
                } else {
                    nodeList.add(query);
                }
                // 一定会有个where query
                whereNode = (Query) nodeList.get(0);
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : nodeList) {
                    if (!(node instanceof TableQuery)) {
                        throw new UnsupportedOperationException("update中暂不支持按照索引进行查询");
                    }
                    // 构造key
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(node.getDataNodeId(), ((TableQuery) node).getActualTableName());
                    List<Integer> batchIndexList = dataNodeIdAndTableNameListToBatchIndexListMap.get(dataNodeIdAndTableNameList);
                    if (batchIndexList == null) {
                        batchIndexList = new ArrayList<Integer>();
                        dataNodeIdAndTableNameListToBatchIndexListMap.put(dataNodeIdAndTableNameList, batchIndexList);
                    }
                    batchIndexList.add(i);
                }
            }
            List<Update> updateList = new ArrayList<Update>();
            for (Map.Entry<List<String>, List<Integer>> entry : dataNodeIdAndTableNameListToBatchIndexListMap.entrySet()) {
                Update $update = buildUpdateNodeWithQuery(update, whereNode.copy(), entry.getKey().get(0), entry.getKey().get(1));
                $update.setBatchIndexList(entry.getValue());
                updateList.add($update);
            }
            if (updateList.size() > 1) {
                // 构造merge
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : updateList) {
                    mergeQuery.merge(node);
                }
                mergeQuery.setDataNodeId(updateList.get(0).getDataNodeId());
                mergeQuery.setExtra(updateList.get(0).getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else if (isBroadcastAndChooseBroadcastWrite) {
                return buildDmlMergeBroadcast(updateList.get(0));
            } else {
                return updateList.get(0);
            }
        } else {
            Query query = shardQuery(update.getWhere(), parameters, extraCmd, true);
            List<drds.plus.sql_process.abstract_syntax_tree.node.Node> nodeList = new ArrayList<drds.plus.sql_process.abstract_syntax_tree.node.Node>();
            if (query instanceof MergeQuery) {
                nodeList.addAll(query.getNodeList());
            } else {
                nodeList.add(query);
            }
            if (nodeList.size() > 1) {
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : nodeList) {
                    mergeQuery.merge(buildUpdateNodeWithQuery(update, (Query) node));
                }
                mergeQuery.setDataNodeId(mergeQuery.getFirstSubNodeQueryNode().getDataNodeId());
                mergeQuery.build();
                return mergeQuery;
            } else {
                Update $update = buildUpdateNodeWithQuery(update, (Query) nodeList.get(0));
                if (isBroadcastAndChooseBroadcastWrite) {
                    // 处理广播表
                    return buildDmlMergeBroadcast($update);
                } else {
                    return $update;
                }
            }
        }
    }

    private static drds.plus.sql_process.abstract_syntax_tree.node.Node shardDelete(Delete delete, Parameters parameters, Map<String, Object> extraCmd) {

        // 处理batch
        // case1 :
        // update xxx setWhereAndSetNeedBuild id > ? and id < ?
        // 针对绑定变量：
        // a. id > 1 and id < 4 , 会得到 2, 3
        // b. id > 2 and id < 5 , 会得到 3, 4
        // 会得到3个表的batch. 2(a) 3(a,b) 4(b)
        //
        // case 2 :
        // update xxx setWhereAndSetNeedBuild id in (?,?)
        // 针对绑定变量：
        // a. id in (2, 3) , 会得到 2, 3
        // b. id in (3, 4) , 会得到 3, 4
        // 会得到3个表的batch. 2(a) 3(a,b) 4(b)
        // 注意:发送给表的sql均为 id in (?,?) ，这里不会是经过in优化的sql
        boolean broadcast = isBroadcast(delete.getWhere().getTableName()) && chooseBroadcastWrite(extraCmd);
        if (parameters != null && parameters.isBatch()) {
            Map<List<String>, List<Integer>> dataNodeIdAndTableNameListToBatchIndexListMap = new HashMap<List<String>, List<Integer>>();
            Query whereNode = null;
            for (int i = 0; i < parameters.getBatchSize(); i++) {
                // 做一下绑定变量
                delete.assignment(parameters.cloneByBatchIndex(i));
                // 根据规则计算, batch不处理in优化
                Query query = shardQuery(delete.getWhere(), parameters, extraCmd, false);
                List<drds.plus.sql_process.abstract_syntax_tree.node.Node> nodeList = new ArrayList();
                if (query instanceof MergeQuery) {
                    nodeList.addAll(query.getNodeList());
                } else {
                    nodeList.add(query);
                }
                // 一定会有个where query
                whereNode = (Query) nodeList.get(0);
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : nodeList) {
                    if (!(node instanceof TableQuery)) {
                        throw new UnsupportedOperationException("update中暂不支持按照索引进行查询");
                    }
                    // 构造key
                    List<String> dataNodeIdAndTableNameList = Arrays.asList(node.getDataNodeId(), ((TableQuery) node).getActualTableName());
                    List<Integer> batchIndexList = dataNodeIdAndTableNameListToBatchIndexListMap.get(dataNodeIdAndTableNameList);
                    if (batchIndexList == null) {
                        batchIndexList = new ArrayList<Integer>();
                        dataNodeIdAndTableNameListToBatchIndexListMap.put(dataNodeIdAndTableNameList, batchIndexList);
                    }
                    batchIndexList.add(i);
                }
            }
            List<Delete> deleteList = new ArrayList<Delete>();
            for (Map.Entry<List<String>, List<Integer>> entry : dataNodeIdAndTableNameListToBatchIndexListMap.entrySet()) {
                Delete $delete = buildDeleteNodeWithQuery(delete, whereNode.copy(), entry.getKey().get(0), entry.getKey().get(1));
                $delete.setBatchIndexList(entry.getValue());
                deleteList.add($delete);
            }
            if (deleteList.size() > 1) {
                // 构造merge
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : deleteList) {
                    mergeQuery.merge(node);
                }
                mergeQuery.setDataNodeId(deleteList.get(0).getDataNodeId());
                mergeQuery.setExtra(deleteList.get(0).getExtra());
                mergeQuery.build();
                return mergeQuery;
            } else if (broadcast) {
                return buildDmlMergeBroadcast(deleteList.get(0));
            } else {
                return deleteList.get(0);
            }
        } else {
            Query query = shardQuery(delete.getWhere(), parameters, extraCmd, true);
            List<drds.plus.sql_process.abstract_syntax_tree.node.Node> nodeList = new ArrayList();
            if (query instanceof MergeQuery) {
                nodeList.addAll(query.getNodeList());
            } else {
                nodeList.add(query);
            }
            if (nodeList.size() > 1) {
                MergeQuery mergeQuery = new MergeQuery();
                for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : nodeList) {
                    mergeQuery.merge(buildDeleteNodeWithQuery(delete, (Query) node));
                }
                mergeQuery.setDataNodeId(mergeQuery.getFirstSubNodeQueryNode().getDataNodeId());
                mergeQuery.build();
                return mergeQuery;
            } else {
                Delete $deleteNode = buildDeleteNodeWithQuery(delete, (Query) nodeList.get(0));
                if (broadcast) {
                    // 处理广播表
                    return buildDmlMergeBroadcast($deleteNode);
                } else {
                    return $deleteNode;
                }
            }
        }
    }


    private static Update buildUpdateNodeWithQuery(Update update, Query query, String dataNodeId, String tableName) {
        if (query instanceof TableQuery) {
            // 不能使用QueryTreeNode.update()生成node
            // 因为使用了代理模式后，update中会将this做为构造参数，而this并不是生成的代理类对象
            query.setDataNodeId(dataNodeId);
            ((TableQuery) query).setActualTableName(tableName);
            return buildUpdateNodeWithQuery(update, query);
        } else {
            throw new UnsupportedOperationException("update中暂不支持按照索引进行查询");
        }
    }

    private static Update buildUpdateNodeWithQuery(Update update, Query query) {
        Update copySelf = update.copySelf();
        copySelf.setWhere((TableQuery) query);
        copySelf.setDataNodeId(query.getDataNodeId());
        return copySelf;
    }

    private static Delete buildDeleteNodeWithQuery(Delete delete, Query query, String dataNodeId, String tableName) {
        if (query instanceof TableQuery) {
            // 不能使用QueryTreeNode.update()生成node
            // 因为使用了代理模式后，update中会将this做为构造参数，而this并不是生成的代理类对象
            query.setDataNodeId(dataNodeId);
            ((TableQuery) query).setActualTableName(tableName);

            return buildDeleteNodeWithQuery(delete, query);
        } else {
            throw new UnsupportedOperationException("delete中暂不支持按照索引进行查询");
        }
    }

    private static Delete buildDeleteNodeWithQuery(Delete delete, Query query) {
        Delete copySelf = delete.copySelf();
        copySelf.setWhere((TableQuery) query);
        copySelf.setDataNodeId(query.getDataNodeId());
        return copySelf;
    }


    /**
     * <pre>
     *   ========================================================================================================================
     *   查询
     *   ========================================================================================================================
     * </pre>
     */
    private static Query shardQuery(Query query, Parameters parameters, Map<String, Object> extraCmd, boolean traceIn) {
        List<Function> functionList = SubQueryPreProcessor.findSubQueryFunctionList(query, false);
        // filter中存在子查询，提前处理
        for (Function function : functionList) {
            Query subQuery = (Query) function.getArgList().get(0);
            // 非correlated subquery理论上在之前都已经被提前计算过，并进行了assignment替换
            subQuery = shardQuery(subQuery, parameters, extraCmd, traceIn);
            function.getArgList().set(0, subQuery);
        }

        if (query instanceof $Query$) {
            $Query$ $Query$ = ($Query$) query;
            Query firstQuery = $Query$.getFirstSubNodeQueryNode();
            firstQuery = shardQuery(firstQuery, parameters, extraCmd, traceIn);
            // 比如select * from (比如select * from a where xx > xx group by b) set xx > xx limit a,b
            // 子查询如果存在聚合函数，不可下推
            // 子查询如果存在from/to，也不可下推.(主要考虑：如果query本身有过滤条件，是在merge查询的基础上过滤，limit之后。如果下推就是limit之前)
            // 父查询如果存在聚合函数，也不可下推
            if (firstQuery instanceof MergeQuery && !($Query$.isExistAggregateExpression() || firstQuery.isExistAggregateExpression() || firstQuery.getLimitFrom() != null && firstQuery.getLimitTo() != null)) {
                return buildMergeQueryFrom$Query$($Query$, (MergeQuery) firstQuery, extraCmd);
            } else {
                $Query$.setFirstSubNodeQueryNode(firstQuery);
                $Query$.setBroadcast(firstQuery.isBroadcast());// 传递一下
                $Query$.setDataNodeId(firstQuery.getDataNodeId());
                $Query$.setExistAggregateExpression(firstQuery.isExistAggregateExpression());
                return $Query$;
            }
        } else if (query instanceof TableQuery) {
            // 此时经过join处理后，已经全部转化为kv结构的查询了
            TableQueryWithIndex tableQueryWithIndex = (TableQueryWithIndex) query;
            // 构造filter
            Filter filter = DnfFilters.and(tableQueryWithIndex.getIndexQueryKeyFilter(), tableQueryWithIndex.getResultFilter());
            filter = DnfFilters.and(filter, tableQueryWithIndex.getOtherJoinOnFilter());
            filter = DnfFilters.and(filter, tableQueryWithIndex.getSubQueryFunctionFilter());
            List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList = route(tableQueryWithIndex.getIndexName(), filter, true, extraCmd);
            return buildMergeQueryFromTableQuery(tableQueryWithIndex, dataNodeDataScatterInfoList, extraCmd, traceIn);
        } else if (query instanceof MergeQuery) {
            // 一个query出现or可能会走到index sort，会拆分为merge合并两个请求
            // 为merge选择执行节点
            // 很多方案...
            // 两路归并?
            // 都发到一台机器上？
            // 目前的方案是都发到一台机器上
            // 对Merge的每个子节点的rowCount进行排序
            // 找出rowCount最大的子节点
            // merge应该在该机器上执行，其他机器的数据都发送给它
            MergeQuery mergeQuery = (MergeQuery) query;
            List<drds.plus.sql_process.abstract_syntax_tree.node.Node> nodeList = mergeQuery.getNodeList();
            //
            mergeQuery = new MergeQuery();
            mergeQuery.setUnion(((MergeQuery) query).isUnion());
            //
            long maxRowCount = 0;
            String maxRowCountDataNodeId = nodeList.get(0).getDataNodeId();
            //
            for (int i = 0; i < nodeList.size(); i++) {
                Query $query = (Query) nodeList.get(i);
                $query = shardQuery($query, parameters, extraCmd, traceIn);
                nodeList.set(i, $query);
                Cost cost = CostEsitimaterFactory.estimate($query);
                if (cost.getKeyFilteredAndValueFilteredCount() > maxRowCount) {
                    maxRowCount = cost.getKeyFilteredAndValueFilteredCount();
                    maxRowCountDataNodeId = $query.getDataNodeId();
                }
            }
            if (maxRowCountDataNodeId == null) {
                maxRowCountDataNodeId = nodeList.get(0).getDataNodeId();
            }
            mergeQuery.setDataNodeId(maxRowCountDataNodeId);//在数据量最大的节点进行执行
            mergeQuery.merge(nodeList);
            mergeQuery.setSharded(true);
            mergeQuery.setBroadcast(false);
            mergeQuery.build();
            return mergeQuery;
        } else if (query instanceof Join) {
            // Join节点应该在数据量大的一段进行
            Join joinNode = (Join) query;
            Query leftNode = joinNode.getLeftNode();
            Query rightNode = joinNode.getRightNode();
            /*
             * 如果是分库键join分库键，并且规则相同，则优化成join sort join
             */
            boolean isJoinOnOneDataNode = false;
            if (chooseJoinMergeJoinForce(extraCmd)) {
                isJoinOnOneDataNode = true;// 强制开启
            } else if (chooseJoinMergeJoinByRule(extraCmd)) {
                isJoinOnOneDataNode = isJoinItemListIsFullMatchShardColumnNameList(joinNode).theSameJoinDataNodeIdOrHasBroadcastLable; // 根据规则判断
            }
            // 处理子节点
            leftNode = shardQuery(leftNode, parameters, extraCmd, traceIn);
            rightNode = shardQuery(rightNode, parameters, extraCmd, traceIn);
            if (isJoinOnOneDataNode || aNodeIsSingleDataBaseSingleTableAndANodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(leftNode, rightNode)) {
                // 尝试构建join sort join，可能会构建失败
                // 失败原因 :
                // 1. 人肉强制开启join sort join的选项
                // 2. 涉及index kv的join查询结构，index的数据和原始数据的分区方式可能不一致
                Query joinQueryMerge = buildJoinQueryMerge(joinNode, leftNode, rightNode, extraCmd);//进行merge展开
                if (joinQueryMerge != null) {
                    return joinQueryMerge;
                }
            }
            // 不做join sort join
            joinNode.setLeftNode(leftNode);
            // NestedLoop情况下
            // 如果右边是多个表，则分库需要再执行层根据左边的结果做
            if (!(rightNode instanceof MergeQuery && (joinNode.getJoinStrategy() == JoinStrategy.nest_loop_join || joinNode.getJoinStrategy() == JoinStrategy.index_nest_loop_join))) {
                joinNode.setRightNode(rightNode);
            } else {
                if (rightNode.isSubQuery()) {
                    // 子表会采取BLOCK_LOOP_JOIN模式，一次性取完结果
                    joinNode.setRightNode(rightNode);
                } else {
                    //该处代码逻辑是MergeQueryNode !(subQuery) (index_nest_loop_join,nest_loop_join) 运行时计算
                    rightNode = new MergeQuery();
                    rightNode.merge(joinNode.getRightNode());
                    ((MergeQuery) rightNode).setSharded(false);//下面有基于此的判断，表示需要运行时计算
                    joinNode.getRightNode().setDataNodeId(data_node_undecided);
                    rightNode.setBroadcast(false);
                    rightNode.build();
                    joinNode.setRightNode(rightNode);
                }
            }


            // 对于未决的IndexNestedLoop，join应该在左节点执行
            if (rightNode instanceof MergeQuery && !((MergeQuery) rightNode).isSharded()) {
                String dataNodeId = joinNode.getLeftNode().getDataNodeId();
                joinNode.setDataNodeId(dataNodeId);
                rightNode.setDataNodeId(joinNode.getDataNodeId());
            } else {
                // 选择一个执行代价最小的节点
                Cost leftCost = CostEsitimaterFactory.estimate(leftNode);
                Cost rightCost = CostEsitimaterFactory.estimate(rightNode);
                String dataNodeId = leftCost.getKeyFilteredAndValueFilteredCount() > rightCost.getKeyFilteredAndValueFilteredCount() ? joinNode.getLeftNode().getDataNodeId() : joinNode.getRightNode().getDataNodeId();
                joinNode.setDataNodeId(dataNodeId);
            }
            joinNode.setBroadcast(leftNode.isBroadcast() && rightNode.isBroadcast());
            return joinNode;
        }

        return query;
    }
    //[$Query$]

    /**
     * @param $Query$    父级
     * @param mergeQuery 子级
     * @param extraCmd
     * @return
     */
    private static Query buildMergeQueryFrom$Query$($Query$ $Query$, MergeQuery mergeQuery, Map<String, Object> extraCmd) {
        List<$Query$> $Query$NodeList = new LinkedList<$Query$>();
        for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : mergeQuery.getNodeList()) {
            // 在未做shard之前就有存在mergeNode的可能，要识别出来
            // 比如OR条件，可能会被拆分为两个Query的Merge，这个经过build之后，会是Merge套Merge或者是Merge套Query
            if (!(node instanceof MergeQuery) && node.getExtra() != null) {
                $Query$ copySelf = $Query$.copySelf();// 只复制自己，不复制子节点
                copySelf.setFirstSubNodeQueryNode((Query) node);
                copySelf.setDataNodeId(node.getDataNodeId());
                copySelf.setExtra(node.getExtra());
                copySelf.setBroadcast(node.isBroadcast());
                $Query$NodeList.add(copySelf);
            }
        }

        if ($Query$NodeList.size() > 1) {
            MergeQuery mergeQuery1 = new MergeQuery();
            mergeQuery1.setAlias($Query$.getAlias());
            mergeQuery1.setSubQueryAndSetNeedBuild($Query$.isSubQuery());
            mergeQuery1.setTableSubAliasAndSetNeedBuild($Query$.getSubAlias());
            // 复制子节点的limit/to信息
            mergeQuery1.setLimitFrom($Query$.getLimitFrom());
            mergeQuery1.setLimitTo($Query$.getLimitTo());
            mergeQuery1.setSubQueryId($Query$.getSubQueryId());
            for ($Query$ $$Query$Node : $Query$NodeList) {
                mergeQuery1.merge($$Query$Node);
            }

            mergeQuery1.setDataNodeId($Query$NodeList.get(0).getDataNodeId());
            mergeQuery1.setExtra($Query$NodeList.get(0).getExtra());
            mergeQuery1.build();
            return mergeQuery1;
        } else if ($Query$NodeList.size() == 1) {
            return $Query$NodeList.get(0);
        } else {
            return $Query$;//mergeQueryNode没有query节点 则不用优化.merge / query只能是其中的一种（步调一致）。这种情况代表子节点类型是MergeQueryNode
        }
    }

    //[where]

    /**
     * 根据执行的目标节点，构建MergeNode
     */
    private static Query buildMergeQueryFromTableQuery(TableQuery tableQuery, List<DataNodeDataScatterInfo> dataNodeDataScatterInfoList, Map<String, Object> extraCmd, boolean traceIn) {
        long maxRowCount = 0;
        String maxRowCountDataNodeId = dataNodeDataScatterInfoList.get(0).getDataNodeId();
        //
        List<List<Query>> queryNodeListList = new ArrayList<List<Query>>();
        // 单库单表是大多数场景，此时无需复制执行计划
        // 大部分情况只有一张表
        boolean needCopy = false;
        if (dataNodeDataScatterInfoList.size() > 0 && dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap().size() > 0) {
            Map<String, ColumnNameToValueSetMap> tableNameToColumnNameToValueSetMapMap = dataNodeDataScatterInfoList.get(0).getTableNameToColumnNameToValueSetMapMap();
            Entry<String, ColumnNameToValueSetMap> entry = tableNameToColumnNameToValueSetMapMap.entrySet().iterator().next();
            //needCopy
            // 如果存在in，则必须使用复制
            if (traceIn && entry.getValue() != null) {
                needCopy |= checkExistInInAndUpdateInFilterValueList(tableQuery.getIndexQueryKeyFilter(), entry.getValue().getColumnNameToValueSetMap(), true);
                needCopy |= checkExistInInAndUpdateInFilterValueList(tableQuery.getResultFilter(), entry.getValue().getColumnNameToValueSetMap(), true);
            }
            //配置的shareMode是true,由此推断淘宝团队希望的是尽可能走share模式
            if (!chooseShareMode(extraCmd) && tableNameToColumnNameToValueSetMapMap.size() > 1) {
                // 如果不使用share模式，并且多于一张表，那就采用复制模式
                needCopy = true;
            }
            //!needCopy=share模式
            if (!needCopy && tableNameToColumnNameToValueSetMapMap.size() > 1) {
                tableQuery = tableQuery.copy(); // 针对share模式，需要复制一份
            }
        }
        int index = 0;
        for (DataNodeDataScatterInfo dataNodeDataScatterInfo : dataNodeDataScatterInfoList) {
            TableQueryNodeSingleDataNodeMultipleTablesMergeInfo tableQueryNodeSingleDataNodeMultipleTablesMergeInfo = buildTableQueryNodeSingleDataNodeMultipleTablesMergeInfo(tableQuery, traceIn, needCopy, dataNodeDataScatterInfo, index);//tableQueryNode, traceIn, needCopy这三个字段代表是否存在in+使用copy模式
            if (!tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.queryList.isEmpty()) {
                queryNodeListList.add(tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.queryList);
            }
            index += dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap().size();
            if (tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.totalKeyFilteredAndValueFilteredCount > maxRowCount) {
                maxRowCount = tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.totalKeyFilteredAndValueFilteredCount;
                maxRowCountDataNodeId = dataNodeDataScatterInfo.getDataNodeId();
            }
        }
        if (queryNodeListList.isEmpty()) {
            throw new EmptyResultFilterException();
        } else if (queryNodeListList.size() == 1 && queryNodeListList.get(0).size() == 1) {
            return queryNodeListList.get(0).get(0); // 只有单库
        } else {
            // 多库执行
            MergeQuery mergeQueryNode = new MergeQuery();
            mergeQueryNode.setAlias(tableQuery.getAlias());
            mergeQueryNode.setSubQueryAndSetNeedBuild(tableQuery.isSubQuery());
            mergeQueryNode.setTableSubAliasAndSetNeedBuild(tableQuery.getSubAlias());
            mergeQueryNode.setDataNodeId(maxRowCountDataNodeId);
            mergeQueryNode.setSubQueryId(tableQuery.getSubQueryId());
            for (List<Query> queryList : queryNodeListList) {
                for (Query query : queryList) {
                    mergeQueryNode.merge(query);
                }
            }
            mergeQueryNode.setBroadcast(false);// merge不可能是广播表
            mergeQueryNode.build();// build过程中会复制子节点的信息
            return mergeQueryNode;
        }
    }

    private static boolean chooseShareMode(Map<String, Object> extraCmd) {
        return true;
    }

    /**
     * 构建单库的执行节点
     * <p>
     * 这三个字段代表是否存在in+使用copy模式,in的时候必须copy
     *
     * @param tableQueryNode
     * @param traceIn
     * @param needCopy
     */
    private static TableQueryNodeSingleDataNodeMultipleTablesMergeInfo buildTableQueryNodeSingleDataNodeMultipleTablesMergeInfo(TableQuery tableQueryNode, boolean traceIn, boolean needCopy, //
                                                                                                                                DataNodeDataScatterInfo dataNodeDataScatterInfo, int baseIndex) {
        long totalKeyFilteredAndValueFilteredCount = 0;
        TableQueryNodeSingleDataNodeMultipleTablesMergeInfo tableQueryNodeSingleDataNodeMultipleTablesMergeInfo = new TableQueryNodeSingleDataNodeMultipleTablesMergeInfo();
        Map<String, ColumnNameToValueSetMap> tableNameToFiledMap = dataNodeDataScatterInfo.getTableNameToColumnNameToValueSetMapMap();
        int index = baseIndex;
        for (String tableName : tableNameToFiledMap.keySet()) {
            TableQuery tableQueryNodeOrCopy = tableQueryNode;
            if (needCopy) {
                tableQueryNodeOrCopy = tableQueryNode.copy();
                // trace in 在分库不分表，和全表扫描时无法使用
                if (traceIn && tableNameToFiledMap.get(tableName) != null && tableNameToFiledMap.get(tableName).getColumnNameToValueSetMap() != null) {
                    checkExistInInAndUpdateInFilterValueList(tableQueryNodeOrCopy.getIndexQueryKeyFilter(), tableNameToFiledMap.get(tableName).getColumnNameToValueSetMap(), false);
                    checkExistInInAndUpdateInFilterValueList(tableQueryNodeOrCopy.getResultFilter(), tableNameToFiledMap.get(tableName).getColumnNameToValueSetMap(), false);
                }
            }
            if (needCopy) {
                tableQueryNodeOrCopy.setActualTableName(tableName);
                tableQueryNodeOrCopy.setDataNodeId(dataNodeDataScatterInfo.getDataNodeId());
                tableQueryNodeOrCopy.setExtra(getIdentifierExtra((TableQueryWithIndex) tableQueryNodeOrCopy, 0));// 设置标志
                tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.queryList.add(tableQueryNodeOrCopy);
            } else {
                tableQueryNodeOrCopy.setActualTableName(index, tableName);
                tableQueryNodeOrCopy.setDataNodeId(index, dataNodeDataScatterInfo.getDataNodeId());
                tableQueryNodeOrCopy.setExtra(index, getIdentifierExtra((TableQueryWithIndex) tableQueryNodeOrCopy, index));// 设置标志
                // 添加一个代理对象
                tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.queryList.add((Query) new ShareDelegate(tableQueryNodeOrCopy, index).getProxyClass());
                index = index + 1;
            }
            // 暂时先用逻辑表名，以后可能是索引名
            String indexName = null;
            if (tableQueryNodeOrCopy instanceof TableQueryWithIndex) {
                indexName = ((TableQueryWithIndex) tableQueryNodeOrCopy).getIndexName();
            } else {
                indexName = tableQueryNodeOrCopy.getTableMetaData().getPrimaryKeyIndexMetaData().getIndexName();
            }
            tableQueryNodeOrCopy.setBroadcast(OptimizerContext.getOptimizerContext().getRouteOptimizer().isBroadCast(indexName));
            totalKeyFilteredAndValueFilteredCount += CostEsitimaterFactory.estimate(tableQueryNodeOrCopy).getKeyFilteredAndValueFilteredCount();
        }
        tableQueryNodeSingleDataNodeMultipleTablesMergeInfo.totalKeyFilteredAndValueFilteredCount = totalKeyFilteredAndValueFilteredCount;
        return tableQueryNodeSingleDataNodeMultipleTablesMergeInfo;
    }

    /**
     * 更新 in filter 的value list
     *
     * @param dryRunAndNotSetValueList 干跑&&不进行真实的设值
     */
    private static boolean checkExistInInAndUpdateInFilterValueList(Filter filter, Map<String, Set<Object>> columnNameToValueSetMap, boolean dryRunAndNotSetValueList) {
        if (filter == null || columnNameToValueSetMap == null) {
            return false;
        }
        if (filter instanceof BooleanFilter) {
            if (filter.getOperation().equals(Operation.in)) {
                Item item = (Item) ((BooleanFilter) filter).getColumn();
                Set<Object> valueList = columnNameToValueSetMap.get(item.getColumnName());
                if (valueList != null && !valueList.isEmpty()) {
                    if (!dryRunAndNotSetValueList) {
                        // 可能只是验证下是否需要做in处理
                        ((BooleanFilter) filter).setValueList(new ArrayList(valueList));
                    }
                    return true;
                }
            }
        } else if (filter instanceof OrsFilter) {
            boolean existIn = false;
            for (Filter $filter : ((OrsFilter) filter).getFilterList()) {
                if ($filter.getOperation().equals(Operation.in)) {
                    Item item = (Item) ((BooleanFilter) $filter).getColumn();
                    Set<Object> valueSet = columnNameToValueSetMap.get(item.getColumnName());
                    if (valueSet != null && !valueSet.isEmpty()) {// 走了规则,在in中有sourceTrace
                        if (!dryRunAndNotSetValueList) {
                            // 可能只是验证下是否需要做in处理
                            ((BooleanFilter) $filter).setValueList(new ArrayList(valueSet));
                        }
                        existIn = true;
                    }
                }
            }
            return existIn;
        } else if (filter instanceof LogicalOperationFilter) {
            boolean existIn = false;
            for (Filter $filter : ((LogicalOperationFilter) filter).getFilterList()) {
                if ($filter.getOperation().equals(Operation.in)) {
                    Item item = (Item) ((BooleanFilter) $filter).getColumn();
                    Set<Object> valueSet = columnNameToValueSetMap.get(item.getColumnName());
                    if (valueSet != null && !valueSet.isEmpty()) {// 走了规则,在in中有sourceTrace
                        if (!dryRunAndNotSetValueList) {
                            // 可能只是验证下是否需要做in处理
                            ((BooleanFilter) $filter).setValueList(new ArrayList(valueSet));
                        }
                        existIn = true;
                    }
                }
            }
            return existIn;
        }

        return false;
    }


    //[Join]
    private static boolean chooseJoinMergeJoinByRule(Map<String, Object> extraCmd) {
        return ExtraCmd.getExtraCmdBoolean(extraCmd, ConnectionProperties.JOIN_MERGE_JOIN_JUDGE_BY_RULE, true);
    }

    private static boolean chooseJoinMergeJoinForce(Map<String, Object> extraCmd) {
        return ExtraCmd.getExtraCmdBoolean(extraCmd, ConnectionProperties.JOIN_MERGE_JOIN, false);
    }


    /**
     * 判断是否为单库单表 join 单库多表
     */
    private static boolean aNodeIsSingleDataBaseSingleTableAndANodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(Query left, Query right) {
        return leftNodeIsSingleDataBaseSingleTableAndRightNodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(left, right) || leftNodeIsSingleDataBaseSingleTableAndRightNodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(right, left);
    }

    /**
     * 左表是单库单表，右表为单库多表，是否满足join on group条件
     */
    private static boolean leftNodeIsSingleDataBaseSingleTableAndRightNodeIsSingleDataBaseMultipleTablesOnTheSameDataNode(Query leftNode, Query rightNode) {
        boolean leftIsSingleTable = !(leftNode instanceof MergeQuery);
        if (!leftIsSingleTable) {// 左表不是merge,而是单表
            return false;
        }
        boolean rightIsMultipleTables = rightNode instanceof MergeQuery;
        if (!(rightIsMultipleTables)) {// 右表是merge,不是单表
            return false;
        }
        String leftDataNodeId = checkAndGetOnlyOneDataNodeId(leftNode);
        String rightDataNodeId = checkAndGetOnlyOneDataNodeId(rightNode);
        if (leftDataNodeId != null && rightDataNodeId != null) {
            if (leftDataNodeId.equals(rightDataNodeId)) {
                leftNode.setBroadcast(true); // 将左表设置为广播表
                return true;
            }
        }

        return false;
    }

    /**
     * 如果只在一个节点则返回不为空的节点id
     */
    private static String checkAndGetOnlyOneDataNodeId(Query query) {
        String dataNodeId = query.getDataNodeId();
        for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : query.getNodeList()) {
            if (node instanceof MergeQuery) {
                return null;
            }
            if (!dataNodeId.equals(node.getDataNodeId())) {
                return null;
            }
        }
        return dataNodeId;
    }

    private static Query buildJoinQueryMerge(Join join, Query leftQuery, Query rightQuery, Map<String, Object> extraCmd) {
        // 根据表名的后缀来判断两个表是不是对应的表
        // 底下的节点已经被优先处理
        // 1. 如果是KvIndexNode，没必要转化为join sort join，直接join即可
        // 2. 如果是QueryNode，底下已经将其转化为merge下套query+child
        // 所以，只要处理MergeNode即可
        Map<String, Query> leftIdentifierExtras = new HashMap<String, Query>();
        List<Query> leftQueryList = new ArrayList<Query>();
        List<Query> rightQueryList = new ArrayList<Query>();
        boolean leftQueryIsBroadCast = false;
        boolean rightQueryIsBroadCast = false;
        if (leftQuery instanceof MergeQuery) {
            for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : leftQuery.getNodeList()) {
                leftIdentifierExtras.put((String) node.getExtra(), (Query) node);
                leftQueryList.add((Query) node);
            }
        } else {
            // 可能是广播表
            leftQueryIsBroadCast = leftQuery.isBroadcast();
        }
        if (rightQuery instanceof MergeQuery) {
            for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : rightQuery.getNodeList()) {
                rightQueryList.add((Query) node);
            }
        } else {
            // 可能是广播表
            rightQueryIsBroadCast = rightQuery.isBroadcast();
        }
        // 非广播表，并且存在聚合计算，不能展开
        if (!leftQueryIsBroadCast && (leftQuery.isExistAggregateExpression() || leftQuery.getLimitFrom() != null || leftQuery.getLimitTo() != null)) {
            return null;
        }
        // 非广播表，并且存在聚合计算，不能展开
        if (!rightQueryIsBroadCast && (rightQuery.isExistAggregateExpression() || rightQuery.getLimitFrom() != null || rightQuery.getLimitTo() != null)) {
            return null;
        }
        List<Join> joinList = new ArrayList<Join>();
        if (leftQueryIsBroadCast && rightQueryIsBroadCast) {
            return join;// 两个广播表之间的join，直接返回
        } else if (leftQueryIsBroadCast) {// 左边是广播表
            for (Query $rightQuery : rightQueryList) {
                Join $join = join.copySelf();
                Query $leftQuery = leftQuery.copy();
                setDataNodeId($leftQuery, $rightQuery.getDataNodeId());// 广播表的执行节点跟着右边走
                $join.setLeftNode($leftQuery);
                $join.setRightNode($rightQuery);
                $join.setDataNodeId($rightQuery.getDataNodeId());
                $join.setExtra($rightQuery.getExtra());
                joinList.add($join);
            }
        } else if (rightQueryIsBroadCast) {
            for (Query $leftQuery : leftQueryList) {
                Join $join = join.copySelf();
                Query $rightQuery = rightQuery.copy();
                setDataNodeId($rightQuery, $leftQuery.getDataNodeId());// 广播表的执行节点跟着右边走
                $join.setLeftNode($leftQuery);
                $join.setRightNode($rightQuery);
                $join.setDataNodeId($leftQuery.getDataNodeId());
                $join.setExtra($leftQuery.getExtra());
                joinList.add($join);
            }
        } else {
            // 根据后缀，找到匹配的表，生成join
            for (Query $rightQuery : rightQueryList) {
                Query $leftQuery = leftIdentifierExtras.remove($rightQuery.getExtra());
                if ($leftQuery == null) {
                    return null; // 转化失败，直接退回merge join merge的处理
                }
                Join $join = join.copySelf();
                $join.setLeftNode($leftQuery);
                $join.setRightNode($rightQuery);
                $join.setDataNodeId($leftQuery.getDataNodeId());
                $join.setExtra($leftQuery.getExtra()); // 因为left/right的extra相同，只要选择一个即可
                joinList.add($join);
            }
        }

        if (joinList.size() > 1) {
            MergeQuery mergeQuery = new MergeQuery();
            for (Join $joinNode : joinList) {
                mergeQuery.merge($joinNode);
            }
            mergeQuery.setSubQueryId(joinList.get(0).getSubQueryId());
            mergeQuery.setDataNodeId(joinList.get(0).getDataNodeId());
            mergeQuery.setExtra(joinList.get(0).getExtra());
            mergeQuery.build();
            return mergeQuery;
        } else if (joinList.size() == 1) {
            return joinList.get(0);
        }

        return null;
    }


    /**
     * 递归设置executeOn
     */
    private static void setDataNodeId(Dml dml, String dataNodeId) {
        Query query = dml.getWhere();
        if (query != null) {
            setDataNodeId(query, dataNodeId);
        }

        query = dml.getQuery();
        if (query != null) {
            setDataNodeId(query, dataNodeId);
        }
    }

    /**
     * 递归设置executeOn
     */
    private static void setDataNodeId(Query query, String dataNodeId) {
        for (drds.plus.sql_process.abstract_syntax_tree.node.Node node : query.getNodeList()) {
            setDataNodeId((Query) node, dataNodeId);
        }
        query.setDataNodeId(dataNodeId);
    }

    /**
     * 根据表名提取唯一标识
     *
     * <pre>
     * 1. tddl中的分库分表时，比如分16个库，每个库128张表，总共1024张表. 表的顺序为递增，从0000-1023，
     *    此时executeNode就是库名，表名可通过后缀获取，两者结合可以唯一确定一张表
     * 2. cobar中的分库分表，只会分库，不分表，每个库中的表名都一样.
     *    此时executeNode就是库名，已经可以唯一确定一张表
     * </pre>
     */
    private static String getIdentifierExtra(TableQueryWithIndex tableQueryWithIndex, int shareIndex) {
        String tableName = tableQueryWithIndex.getActualTableName(shareIndex);
        if (tableName == null) {
            tableName = tableQueryWithIndex.getIndexName();
        }
        Matcher matcher = suffixPattern.matcher(tableName);
        if (matcher.find()) {
            return (tableQueryWithIndex.getDataNodeId(shareIndex) + "_" + matcher.group());
        } else {
            return tableQueryWithIndex.getDataNodeId(shareIndex);
        }
    }


}
